Assignment 03

Author
Affiliation

Ryan Martin

Boston University

Published

September 24, 2025

Modified

September 22, 2025

from pyspark.sql import SparkSession
import pandas as pd
import plotly.express as px
import plotly.io as pio
pio.renderers.default = "svg"
import re
import numpy as np
import plotly.graph_objects as go
from pyspark.sql.functions import col, split, explode, regexp_replace, transform, when
from pyspark.sql import functions as F
from pyspark.sql.functions import col, monotonically_increasing_id

np.random.seed(42)

pio.renderers.default = "notebook"

# Initialize Spark Session
spark = SparkSession.builder.appName("./data/LightcastData").getOrCreate()

# Load Data
df = spark.read.option("header", "true").option("inferSchema", "true").option("multiLine","true").option("escape", "\"").csv("./data/lightcast_job_postings.csv")

# Show Schema and Sample Data
#print("---This is Diagnostic check, No need to print it in the final doc---")

#df.printSchema() # comment this line when rendering the submission
#df.show(5)
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/22 04:31:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[Stage 1:>                                                          (0 + 1) / 1]                                                                                

1 1. Data Preperation

df = df.withColumn("SALARY", col("SALARY").cast("float")) \
      .withColumn("SALARY_FROM", col("SALARY_FROM").cast("float")) \
      .withColumn("SALARY_TO", col("SALARY_TO").cast("float")) \
      .withColumn("MIN_YEARS_EXPERIENCE", col("MIN_YEARS_EXPERIENCE").cast("float")) \
      .withColumn("MAX_YEARS_EXPERIENCE", col("MAX_YEARS_EXPERIENCE").cast("float")) \
      
def compute_median(sdf, col_name):
  q = sdf.approxQuantile(col_name, [0.5], 0.01)
  return q[0] if q else None

median_from = compute_median(df, "SALARY_FROM")
median_to = compute_median(df, "SALARY_TO")
median_salary = compute_median(df, "SALARY")

print("Medians:", median_from, median_to, median_salary)

df = df.fillna({
      "SALARY_FROM": median_from,
      "SALARY_TO": median_to,
      "SALARY": median_salary
})

df= df.withColumn("Average_Salary", (col("SALARY_FROM") + col("SALARY_TO")) / 2)


export_cols = [
    "EMPLOYMENT_TYPE_NAME",
    "EDUCATION_LEVELS_NAME",
    "REMOTE_TYPE_NAME",
    "MAX_YEARS_EXPERIENCE",
    "Average_Salary",
    "SALARY",
    "LOT_V6_SPECIALIZED_OCCUPATION_NAME"
] 

df_selected = df.select(*export_cols)

pdf = df_selected.toPandas()
pdf.to_csv("./data/lightcast_cleaned.csv", index=False)


print("Data Cleaning Complete. Rows retained:", len(pdf))
[Stage 2:>                                                          (0 + 1) / 1]                                                                                [Stage 3:>                                                          (0 + 1) / 1]                                                                                [Stage 4:>                                                          (0 + 1) / 1]                                                                                [Stage 5:>                                                          (0 + 1) / 1]                                                                                
Medians: 87295.0 130042.0 115024.0
Data Cleaning Complete. Rows retained: 72498

2 2. Salary Distribution by Industry and Employment Type

import pandas as pd


pdf = df.filter(df["SALARY"] > 0).select("EMPLOYMENT_TYPE_NAME", "SALARY").toPandas()
import re
pdf["EMPLOYMENT_TYPE_NAME"] = pdf["EMPLOYMENT_TYPE_NAME"].apply(lambda x: re.sub(r"[^\x00-\x7F]+", "", x) if isinstance(x, str) else "")

pdf = pdf[pdf["EMPLOYMENT_TYPE_NAME"] != ""]

median_salaries = pdf.groupby("EMPLOYMENT_TYPE_NAME")["SALARY"].median()
median_salaries.head()

sorted_employment_types = median_salaries.sort_values(ascending=False).index

pdf["EMPLOYMENT_TYPE_NAME"] = pd.Categorical(
    pdf["EMPLOYMENT_TYPE_NAME"],
    categories=sorted_employment_types,
    ordered=True
)
fig = px.box(
    pdf,
    x = "EMPLOYMENT_TYPE_NAME",
    y = "SALARY",
    title = "Salary Distribution by Employment Type",
    color_discrete_sequence=["black"],
    boxmode = "group",
    points = "all",
)

fig.update_layout(
    title=dict(
        text="Salary Distribution by Employment Type",
        font=dict(size=30, family="Arial", color="black", weight = "bold")
    ),
    
    xaxis=dict(
        title=dict(text="Employment Type", font=dict(size=14, family = "Arial", color = "black", weight = "bold")),
        tickangle=0, 
        tickfont=dict(size=12, family = "Arial", color = "black", weight = "bold"),
        showline = True,
        linewidth=2,
        linecolor="black",
        mirror=True,
        showgrid=False,
        categoryorder="array",
        categoryarray=sorted_employment_types.tolist()
    ),
    yaxis=dict(title=dict(text="Salary (K $)", font=dict(size=14, family = "Arial", color = "black", weight = "bold")),
        tickvals=[0, 50000, 100000, 150000, 200000, 250000, 300000, 350000, 400000, 450000, 500000],
        ticktext=["0", "50K", "100K", "150K", "200K", "250K", "300K", "350K", "400K", "450K", "500K"],
        tickfont=dict(size=12, family = "Arial", color = "black", weight = "bold"),
        showline = True,
        linewidth=2,
        linecolor="black",
        mirror=True,
        showgrid=False,
        gridcolor="lightgray",
        gridwidth=0.5,
    ),
    font=dict(family="Arial", size = 12, color = "black"),
    boxgap=0.7,
    plot_bgcolor="white",
    paper_bgcolor="white",
    showlegend=False,
    height=500,
    width=800,


)



fig.show()
fig.write_html("output/Q1.html")
fig.write_image("output/Q1.svg", width=850, height=500, scale=1)
[Stage 6:>                                                          (0 + 1) / 1]